MQTTに送ったメッセージをAmazon Kinesisにプロキシさせる
はじめに
MQTT(MQ Telemetry Transport) というメッセージングプロトコルが IoT/M2M 界隈で注目されてきています。MQTTには
- HTTPに比べてプロトコルオーバーヘッドが小さい
- 到達可能性を細かく制御できる
- Pub/Sub メッセージが手軽に扱える
といった特徴があります。 昨年末から販売されている超小型コンピュータ Intel Edison では初期状態から MQTT のライブラリがインストールされているなど、MQTT を利用する敷居が下がりつつあります。
IoTデバイスがMQTTブローカーにデータ送信し、Amazon Kinesis でリアルタイムでデータ処理する、というようなユースケースは今後どんどん増えてくるのではないでしょうか。
本ブログの狙い
本ブログではmqtt-kinesis-bridge を利用し、MQTT ブローカーに送信されたメッセージを Amazon Kinesis にプロキシして取り込む方法を紹介します。
AWS Labas が公開している mqtt-kinesis-bridge
は MQTT と Amazon Kinesis を連携する IoT の手段としてまれに紹介されますが、どういうわけか、ちゃんとインストールしたブログをあまり見かけないので、人柱になって動かしてみた次第です。
以下の流れでデータ連携の動作確認をします。
- MQTT ブローカー mosquitto をインストールし、pub/sub を確認
mqtt-kinesis-bridge
のベースになっている MQTT ライブラリの Paho を利用して pub/sub を確認- MQTT ブローカーへ publish したメッセージを
mqtt-kinesis-bridge
で Kinesis にプロキシし、AWS CLI で Kinesis からレコードを受信
検証環境は Amazon Linux AMI 2015.03 とします。
MQTT mosquitto で pub/sub してみる
まずは MQTT の OSS 実装である mosquitto
をインストールし、pub/sub してみます。
MQTT mosquitto のインストール
CentOS 6 向けのパッケージが用意されているため、CentOS 6 向けと同じく yum レポジトリを追加して Amazon Linux にインストールします。
$ sudo curl http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/home:oojah:mqtt.repo -o /etc/yum.repos.d/mqtt.repo % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 271 100 271 0 0 476 0 --:--:-- --:--:-- --:--:-- 476 $ cat /etc/yum.repos.d/mqtt.repo [home_oojah_mqtt] name=mqtt (CentOS_CentOS-6) type=rpm-md baseurl=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/ gpgcheck=1 gpgkey=http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-6/repodata/repomd.xml.key enabled=1 $ yum search mosquitto Loaded plugins: priorities, update-motd, upgrade-helper ================================================ N/S matched: mosquitto ====================================================================== mosquitto-clients.i686 : Mosquitto command line publish/subscribe clients mosquitto-clients.x86_64 : Mosquitto command line publish/subscribe clients mosquitto-debuginfo.i686 : Debug information for package mosquitto mosquitto-debuginfo.x86_64 : Debug information for package mosquitto libmosquitto-devel.i686 : MQTT C client library development files libmosquitto-devel.x86_64 : MQTT C client library development files libmosquitto1.i686 : MQTT C client library libmosquitto1.x86_64 : MQTT C client library libmosquittopp-devel.i686 : MQTT C++ client library development files libmosquittopp-devel.x86_64 : MQTT C++ client library development files libmosquittopp1.i686 : MQTT C++ client library libmosquittopp1.x86_64 : MQTT C++ client library mosquitto.i686 : MQTT version 3.1/3.1.1 compatible message broker mosquitto.x86_64 : MQTT version 3.1/3.1.1 compatible message broker Name and summary matches only, use "search all" for everything.
$ yum search
の実行結果からわかるように、mosquitto はサーバーmosquitto
とクライアントmosquitto-client
の両方のアプリケーションを提供しているので、これらをインストールします。
$ sudo yum install -y mosquitto-clients mosquitto
mosquitto サーバ(MQTT ブローカー)の起動
mosquitto は自動起動する設定になっていないため chkconfig --add
します。
$ sudo chkconfig --add mosquitto $ sudo chkconfig mosquitto on $ sudo chkconfig mosquitto --list mosquitto 0:off 1:off 2:on 3:on 4:on 5:on 6:off
mosquitto サーバを起動します。
$ sudo service mosquitto start Starting Mosquitto MQTT broker [ OK ] # 1439606245: mosquitto version 1.4.2 (build date 2015-06-10 13:52:20+0000) starting 1439606245: Config loaded from /etc/mosquitto/mosquitto.conf. 1439606245: Opening ipv4 listen socket on port 1883. 1439606245: Opening ipv6 listen socket on port 1883.
mosquitto はデフォルトでは 1883 番ポートで LISTEN
します。
mosquitto クライアントから MQTT をしゃべってみる
MQTT ブローカーとの通信には、クライアントプログラム
- publisher(
mosquitto_pub
) - subscribe(
mosquitto_sub
)
を利用します。
MQTT ブローカーに subscribe する
mosquitto_sub
で MQTT ブローカーに subscribe します。
subscribe するトピックは -t
オプションで指定します。
検証目的のため、デバッグオプション(-d
)もつけて接続します。
$ mosquitto_sub -d -t sensors/temperature Client mosqsub/7721-ip-172-31- sending CONNECT Client mosqsub/7721-ip-172-31- received CONNACK Client mosqsub/7721-ip-172-31- sending SUBSCRIBE (Mid: 1, Topic: sensors/temperature, QoS: 0) Client mosqsub/7721-ip-172-31- received SUBACK Subscribed (mid: 1): 0
MQTT ブローカーに publish する
次に mosquitto_pub
で MQTT ブローカーにメッセージを publish します。
新規ターミナルを開き、トピック(-t
)とメッセージ(-m
)を指定します。
気温が32.5℃であれば、センサーからは以下の様なメッセージを送りたくなります。
$ mosquitto_pub -t sensors/temperature -m "32.5" 1439606534: New connection from 127.0.0.1 on port 1883. 1439606534: New client connected from 127.0.0.1 as mosqpub/7722-ip-172-31- (c1, k60). 1439606534: Client mosqpub/7722-ip-172-31- disconnected.
このとき mosquitto_sub
のターミナルには以下のようなメッセージが表示されるはずです。
Client mosqsub/7721-ip-172-31- received PUBLISH (d0, q0, r0, m0, 'sensors/temperature', ... (4 bytes)) 32.5
publish されたメッセージ 32.5
を受け取れました。
トピックをまとめて subscribe
今回は subscribe するメッセージを sensors/temperature
と固定しましたが、 sensors/
で始まるトピック全般を subscribe したいときは sensors/+
のように +
を接尾します。
$ mosquitto_sub -d -t sensors/+
この状態で
$ mosquitto_pub -t sensors/temperature -m "33.5" $ mosquitto_pub -t sensors/humidity -m "78.9"
と publish すると subscriber には
... Client mosqsub/30662-ip-172-31 received PUBLISH (d0, q0, r0, m0, 'sensors/temperature', ... (4 bytes)) 33.5 ... Client mosqsub/30662-ip-172-31 received PUBLISH (d0, q0, r0, m0, 'sensors/humidity', ... (4 bytes)) 78.9
というように sensors/temperature
と sensors/humidity
という sensors/
系トピックのメッセージがまるっと送信されます。
Paho ライブラリで pub/sub してみる
Paho は Eclipse Foundation が中心になって開発する OSS の MQTT ライブラリです。 本ブログの最終ゴールである mqtt-kinesis-bridge はこの Python バインディングをベースにしています。 Paho の Python 実装を利用して pub/sub してみます。
MQTT ブローカーには引き続き mosquitto を利用します。
paho のインストール
Python ライブラリのためパッケージ管理ツール pip
でパッケージ paho-mqtt
をインストールします。
$ sudo pip install --upgrade paho-mqtt
paho-mqtt
ライブラリの使い方はドキュメントやサンプロコードレポジトリを参考にしてください。
ライブラリは素直な作りで、 on_xxx
とコールバックをフックする仕組みが備わっています。
MQTT サーバに subscribe する
プログラムを最小にするために、トピックやMQTT ブローカーはプログラム決め打ちとします。
まずは subscriber(sub.py) です。
# mqtt subscriber # sub.py import paho.mqtt.client as mqtt # The callback for when the client receives a CONNACK response from the server. def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe(topic="sensors/temperature") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print("on_message:" + msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(host="localhost", port=1883) client.loop_forever()
MQTT サーバに publish する
次に publisher(pub.py) です。 こちらもトピック、メッセージ、MQTT ブローカーともにプログラム内に決め打ちします。
# mqtt publisher # pub.py import paho.mqtt.publish as publish publish.single(topic="sensors/temperature", payload="28.5", hostname="localhost", port=1883)
Paho から pub/sub の実行
次のコマンドで subscribe します。
$ python sub.py Connected with result code 0
別ターミナルから次のコマンドで publish します。
$ python pub.py
subscriber のターミナルには次のようにメッセージ表示されれば成功です。
$ python sub.py Connected with result code 0 on_message:sensors/temperature 28.5
mqtt-kinesis-bridge で MQTT メッセージを Kinesis にプロキシさせる
最後に MQTT-Kinesis ブリッジ mqtt-kinesis-bridge
を使って MQTT に publish したメッセージを Amazon kinesis にプロキシします。
Amazon Kinesis Stream の用意
まずは Kinesis Stream を作成します。
$ aws kinesis create-stream --stream-name Foo --shard-count 1
mqtt-kinesis-bridge の起動
mqtt-kinesis-bridge
はパッケージとして綺麗に切りだされていないため git でソースコードを持ってきます。
$ sudo yum install -y git jq $ git clone https://github.com/awslabs/mqtt-kinesis-bridge.git $ cd mqtt-kinesis-bridge
Kinesis との通信(PutRecord
)には Python の AAWS SDK boto を利用しているため pip でインストールします。
$ sudo pip install --upgrade boto
プロキシープログラムである bridge.py に
Kinesis Stream名 リージョン subscribe するトピックス
を指定して MQTT-Kinesis プロキシを起動します。
$ export STREAM=Foo $ python bridge.py $STREAM --region ap-northeast-1 --topic_name sensors/+ { "StreamDescription": { "HasMoreShards": false, "Shards": [ { "HashKeyRange": { "EndingHashKey": "34", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49" }, "ShardId": "shardId-000000000000" } ], "StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/Foo", "StreamName": "Foo", "StreamStatus": "ACTIVE" } } Starting MQTT-to-Kinesis bridge Bridge Connected, looping... 1439640509: New connection from 127.0.0.1 on port 1883. 1439640509: New client connected from 127.0.0.1 as paho/4DF21AE636529C7E2F (c1, k60). Connection Msg: Subscribe topic: sensors/+ RC: (0, 1)
MQTT に publish
次に、別ターミナルからトピック sensors/humidity
にメッセージを publish します。
$ mosquitto_pub -d -t sensors/humidity -m "78.9" Client mosqpub/29001-ip-172-31 sending CONNECT Client mosqpub/29001-ip-172-31 received CONNACK Client mosqpub/29001-ip-172-31 sending PUBLISH (d0, q0, r0, m1, 'sensors/humidity', ... (4 bytes)) Client mosqpub/29001-ip-172-31 sending DISCONNECT
プロキシサーバに以下のようなメッセージが表示されていれば成功です。
on_message topic: "sensors/humidity" msg.payload: "78.9" -= put seqNum: 49553524959787167203100577550735447012172782095451553810
ログ出力されている seqNum は Amazon Kinesis の SequenceNumber
です。
Kinesis からレコードを取得
AWS CLI を使って Amazon Kinesis からレコードを取得してみましょう。
SequenceNumber
を元に ShardIterator
を取得し、レコードを取得します。
$ aws kinesis get-shard-iterator \ --stream-name Foo \ --shard-id shardId-000000000000 \ --shard-iterator-type AT_SEQUENCE_NUMBER \ --starting-sequence-number 49553524959787167203100577550735447012172782095451553810 { "ShardIterator": "012345" } $ aws kinesis get-records --shard-iterator 012345 { "Records": [ { "PartitionKey": "sensors/humidity", "Data": "NzguOQ==", "SequenceNumber": "49553524959787167203100577550735447012172782095451553810" } ], "NextShardIterator": "...", "MillisBehindLatest": 199000 }
レコードの payload(上のレスポンスでは NzguOQ==
) は base64 エンコードされています。
このデータをデコードし、 publish したメッセージと同じか確認します。
$ echo -n "NzguOQ==" | base64 -d 78.9
期待通りの値にデコードされました。
まとめ
今回は mqtt-kinesis-bridge を使って MQTT から Amazon Kinesis にデータ連携してみました。 MQTT やMQTT ライブラリの動作確認も行ったため、ブログは長くなりましたが、手元でさくっと MQTT-Kinesis 連携させる上ではお手軽なソリューションの一つです。
mqtt-kinesis-bridge は README ファイルに "A simple Python-based MQTT-to-Kinesis Bridge example" と書かれているように、あくまでモック的な位置づけであり、 実態は MQTT からのメッセージ受信のイベントに対して、実装を最大限サボって Kinesis に PutRecord しているだけであり、MQTTのQoS保証を活かして取りこぼしなく Kinesis に流すことなどは1インデントも考慮されていません。
ちょっと動かしてみる分には便利ですが、その辺りを理解した上で利用しましょう。
補足
- Kinesis ストリームの作成と EC2(作成したストリームへの操作権限ロール付き)の起動をする cloudformation
- EC2 へもろもろのインストールをする Ansible Playbook
を用意しました。
mqtt-kinesis-bridge を動かす手間を減らしたいという方がいらっしゃいましたら、お使いください。
レポジトリ https://github.com/quiver/mqtt-kinesis-bridge-ansible-playbook
cloudformation で EC2/Kinesis を起動
$ git clone git@github.com:quiver/mqtt-kinesis-bridge-ansible-playbook.git $ cd mqtt-kinesis-bridge-ansible-playbook $ vi cloudformation/parameters.json # SSH の鍵の名前やインスタンスタイプを修正 $ aws cloudformation create-stack \ --stack-name mqtt2kinesis \ --template-body file://cloudformation/kinesis-data-vis-sample-app.template.json \ --parameters file://cloudformation/parameters.json \ --capabilities=CAPABILITY_IAM
Ansible Playbook で MQTT 関連プログラムをインストール
$ cat hosts.ini ; IP address of the target EC2 instance 1.2.3.4 $ vi hosts.ini # EC2 の IP アドレスを編集 $ ansible-playbook -i hosts.ini site.yml
参考
- AWS Black Belt Techシリーズ Amazon Kinesis http://www.slideshare.net/AmazonWebServicesJapan/aws-black-belt-tech-amazon-kinesis
- GitHub の mqtt-kinesis-bridge レポジトリ https://github.com/awslabs/mqtt-kinesis-bridge
- MQTT Mosquitto http://mosquitto.org/
- MQTT Paho http://www.eclipse.org/paho/